#include "config.h"

#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/poll.h>
#include <sys/epoll.h>
#include <sys/file.h>
#include <sys/mman.h>
#include <linux/fs.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include <libgen.h>
#include <ctype.h>
#include <fcntl.h>
#include <libaio.h>
#include <limits.h>
#include <errno.h>
#include <sys/vfs.h>

#define DBG_SUBSYS S_LIBREPLICA

#include "disk.h"

#include "lich_aio.h"
#include "core.h"

#define MBR_OFFSET 0
#define MBR_SIZE 512

typedef struct {
        int inited;

        int io_fd;
        int cache_fd;
        int metadata_fd;
        char path[MAX_PATH_LEN];
} disk_normal_fd_t;

int initnew(disk_t *disk, const char *home, const char *pool)
{
        int ret;
        diskinfo_t diskinfo;
        disk_extinfo_t extinfo;
        char path[MAX_PATH_LEN];

        ret = disk_setinfo(home, pool, disk, &diskinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        snprintf(path, MAX_PATH_LEN, "%s/info/%d.info", home, disk->idx);

        ret = _set_value(path, (void *)&diskinfo, sizeof(diskinfo), O_CREAT | O_TRUNC);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = bmap_set(&disk->bmap, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (gloconf.bmap_mem) {
                //nothing todo
        } else {
                ret = fsync(disk->map_fd);
                if (ret)
                        UNIMPLEMENTED(__DUMP__);
        }

        extinfo.disk_size = disk->real_size;
        ret = disk_extinfo_set(home, disk->idx, &extinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        return 0;
err_ret:
        return ret;
}

int normal_disk_probe_check(disk_t *disk, const char *home, const char *pool)
{
        int ret;
        diskinfo_t diskinfo, _diskinfo;
        char path[MAX_PATH_LEN], uuid[MAX_NAME_LEN], _uuid[MAX_NAME_LEN];

        if (disk->status & __DISK_OFFLINE__) {
                DWARN("disk[%u] offline\n", disk->idx);
                return 0;
        }

        if (bmap_get(&disk->bmap, 0) == 0) {
                DINFO("new disk[%u], need init\n", disk->idx);

                ret = disk->dop->create_new(disk, home, pool);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

        } else {
                ret = disk_getinfo(home, disk, &diskinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                snprintf(path, MAX_PATH_LEN, "%s/info/%d.info", home, disk->idx);

                ret = _get_value(path, (void *)&_diskinfo, sizeof(_diskinfo));
                if (ret < 0) {
                        ret = -ret;
                        GOTO(err_ret, ret);
                }

                if (memcmp(&diskinfo, &_diskinfo, sizeof(diskinfo))) {
                        uuid_unparse(diskinfo.diskid, uuid);
                        uuid_unparse(_diskinfo.diskid, _uuid);
                        DWARN("bad disk, id %s : %s, cluster %s : %s, idx %u : %u\n",
                                        uuid, _uuid, diskinfo.cluster, _diskinfo.cluster,
                                        diskinfo.idx, _diskinfo.idx);

                        disk_unlink(home, disk);

                        EXIT(EIO);
                }
        }

        return 0;
err_ret:
        return ret;
}

void offline(disk_t *disk)
{
        DWARN("disk %d offline\n", disk->idx);
        disk->status |= __DISK_OFFLINE__;
}

int writeable(disk_t *disk)
{
        int ret;
        char buf[MBR_SIZE] = {0};

        if (unlikely(disk->status & __DISK_OFFLINE__))
                return FALSE;

        memcpy(buf, gloconf.uuid, strlen(gloconf.uuid));
        ret = disk->dop->io_pwrite(disk, buf, sizeof(buf), DISK_WRITEABLE_OFFSET);
        if (unlikely(ret != sizeof(buf))) {
                return FALSE;
        }

        return TRUE;
}

int destroy(disk_t *disk)
{
        int ret;

        ret = disk_erasure_superblock(disk);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        return 0;
err_ret:
        return ret;
}

int disk_open(disk_t *disk, const char *home, char *pool, uint64_t *disk_size)
{
        int ret, fd;
        struct stat stbuf;
        char target[PATH_MAX];
        disk_normal_fd_t *normal_fd;

        DINFO("open disk %d home %s\n", disk->idx, home);

        YASSERT(disk->disk_fd == NULL);

        sprintf(target, "%s/disk/%d.disk", home, disk->idx);
        ret = disk_get_realpath(target, &stbuf);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (unlikely(gloconf.testing)) {
                *disk_size = stbuf.st_size;
        } else {
                ret = disk_getblocksize(target, disk_size);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ret = ymalloc((void **)&normal_fd, sizeof(disk_normal_fd_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        strcpy(normal_fd->path, target);

        { /* cache_fd */
                fd = open(target, O_RDWR, 0);
                if (fd < 0) {
                        ret = errno;
                        GOTO(err_free, ret);
                }

                normal_fd->cache_fd = fd;
        }

        { /* metadata_fd */
                fd = open(target, O_RDWR | O_SYNC, 0);
                if (fd < 0) {
                        ret = errno;
                        GOTO(err_free, ret);
                }

                normal_fd ->metadata_fd = fd;
        }

        { /* io_fd */
                fd = open(target, O_RDWR | O_DIRECT, 0);
                if (fd < 0) {
                        ret = errno;
                        if (ret == EINVAL) {
                                fd = open(target, O_RDWR | O_SYNC, 0);
                                if (fd < 0) {
                                        ret = errno;
                                        GOTO(err_free, ret);
                                }
                        } else
                                GOTO(err_free, ret);
                }

                normal_fd->io_fd = fd;
        }

        normal_fd->inited = 1;
        disk->disk_fd = normal_fd;
        disk->disk_type = __DISK_TYPE_NORMAL_DISK__;
	{
		ret = disk_getcache(disk, &disk->cache);
		if (unlikely(ret)) {
			GOTO(err_free, ret);
		}
	}

        ret = disk_get_base_offset(target, disk);
        if (unlikely(ret))
                GOTO(err_free, ret);

        YASSERT(disk->disk_base_offset % PAGE_SIZE == 0);
        *disk_size = *disk_size - disk->disk_base_offset;

        ret = disk_getpool(disk, pool);
        if (unlikely(ret))
                GOTO(err_free, ret);

        DINFO("open disk %d (type %d) home %s done\n", disk->idx, disk->disk_type, home);
        return 0;
err_free:
        yfree((void **)&normal_fd);
err_ret:
        return ret;
}

/**
 * maybe called many times
 *
 * @param disk
 */
void disk_close(disk_t *disk)
{
        disk_normal_fd_t *normal_fd  = (disk_normal_fd_t *)disk->disk_fd;

        DINFO("close disk %d normal_fd %p\n", disk->idx, normal_fd);

        if (normal_fd) {
                close(normal_fd->io_fd);
                close(normal_fd->cache_fd);
                close(normal_fd->metadata_fd);

                normal_fd->inited = 0;

                yfree((void **)&normal_fd);

                disk->disk_fd = NULL;
        }
}

static int disk_io_pread(const disk_t *disk, char *buf, size_t size, off_t _offset)
{
        int ret;
        disk_normal_fd_t *normal_fd  = (disk_normal_fd_t *)disk->disk_fd;
        uint64_t fd = normal_fd->metadata_fd;

        YASSERT(_offset < BCACHE_SUPERBLOCK_LEN);
        ret = _pread(fd, buf, size, _offset);
        if(ret < 0){
                GOTO(err_ret, ret);
        }

        return ret;
err_ret:
        return ret;
}

static int disk_io_pwrite(const disk_t *disk, char *buf, size_t size,  off_t _offset)
{
        int ret;
        disk_normal_fd_t *normal_fd  = (disk_normal_fd_t *)disk->disk_fd;
        uint64_t fd = normal_fd->metadata_fd;

        YASSERT(_offset < BCACHE_SUPERBLOCK_LEN);
        ret = _pwrite(fd, buf, size, _offset);
        if(ret < 0){
                GOTO(err_ret, ret);
        }

        return ret;

err_ret:
        return ret;
}

/*
static int disk_io_preadv(const disk_t *disk, buffer_t *buf, off_t _offset)
{
        int ret,count;
        struct iovec iov[LICH_IOV_MAX];
        disk_normal_fd_t *normal_fd  = (disk_normal_fd_t *)disk->disk_fd;
        uint64_t fd = normal_fd->metadata_fd;

        DBUG("read %lu, size %u offset %lu \n", (uint64_t)fd, buf->len, _offset);

        count = LICH_IOV_MAX;
        if (mbuffer_segcount((buffer_t *)buf) > count) {
                mbuffer_compress((buffer_t *)buf);
        }

        ret = mbuffer_trans(iov, &count, buf);
        YASSERT(ret == (int)buf->len);

        YASSERT(_offset < BCACHE_SUPERBLOCK_LEN);
        ret = preadv((uint64_t)fd, iov, count, _offset);
        if (ret < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int disk_io_pwritev(const disk_t *disk, const buffer_t *buf, off_t _offset)
{
        int ret;
        disk_normal_fd_t *normal_fd  = (disk_normal_fd_t *)disk->disk_fd;
        uint64_t fd = normal_fd->metadata_fd;

        DBUG("write %lu, size %u offset %lu \n", (uint64_t)fd, buf->len,  _offset);

        YASSERT(_offset < BCACHE_SUPERBLOCK_LEN);
        ret = mbuffer_writefile(buf, (uint64_t)fd,  _offset);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}
*/

static int disk_aio_readv(const disk_t *disk, const chkid_t *chkid, buffer_t *buf, off_t _offset, int prio)
{
        int ret, iov_count, mode;
        uint64_t fd = 0;
        task_t task;
        struct iocb iocb;
        struct iovec iov[LICH_IOV_MAX];
        disk_normal_fd_t *normal_fd  = (disk_normal_fd_t *)disk->disk_fd;

        /**
         * raw data used direct io(except init data when create a raw), metadata used sync io
         */
        if (likely(chkid && chkid->type == __RAW_CHUNK__
                                && (buf->len % LICH_BLOCK_SIZE) == 0
                                && (_offset % LICH_BLOCK_SIZE) == 0)) {
                fd = normal_fd->io_fd;
                mode = AIO_MODE_DIRECT;
        } else {
                /* 1.metada 2.raw init data(chkid is NULL) 3.lichfs & lichbd cmd(io not align) */
                fd = normal_fd->metadata_fd;
                mode = AIO_MODE_SYNC;
        }

        iov_count = LICH_IOV_MAX;
        if (mbuffer_segcount((buffer_t *)buf) > iov_count) {
                mbuffer_compress((buffer_t *)buf);
        }

        ret = mbuffer_trans(iov, &iov_count, buf);
        YASSERT(ret == (int)buf->len);

#if ISCSI_IO_RECORD
        if (chkid)
                DINFO("aio write "CHKID_FORMAT" fd %d disk %d base_offset:%ld offset %ld size:%d\n",
                                CHKID_ARG(chkid), fd, disk->idx, disk->disk_base_offset, _offset, buf->len);
#endif

        io_prep_preadv(&iocb, fd, iov, iov_count, _offset + disk->disk_base_offset);

        iocb.aio_reqprio = 0;
        task = schedule_task_get();
        iocb.data = &task;

        ANALYSIS_BEGIN(0);
        
        DBUG("aio yield, task %u\n", task.taskid);

        ret = aio_commit(&iocb, prio, mode);
        if (unlikely(ret)) {
                if (ret == EINVAL)
                        YASSERT(0);
                GOTO(err_ret, ret);
        }

        DBUG("aio resume, task %u\n", task.taskid);

        ANALYSIS_QUEUE(0, IO_WARN, "disk_aio_readv");
        
        return 0;
err_ret:
        return ret;
}

static int disk_aio_writev(const disk_t *disk, const chkid_t *chkid, const buffer_t *buf, off_t _offset, int prio)
{
        int ret, iov_count, mode;
        uint64_t fd = 0;
        task_t task;
        struct iocb iocb;
        struct iovec iov[LICH_IOV_MAX];
        disk_normal_fd_t *normal_fd  = (disk_normal_fd_t *)disk->disk_fd;

        /**
         * raw data used direct io(except init data when create a raw), metadata used sync io
         */
        if (likely(chkid && chkid->type == __RAW_CHUNK__
                                && (buf->len % LICH_BLOCK_SIZE) == 0
                                && (_offset % LICH_BLOCK_SIZE) == 0)) {
                fd = normal_fd->io_fd;
                mode = AIO_MODE_DIRECT;
        } else {
                /* 1.metada 2.raw init data(chkid is NULL) 3.lichfs & lichbd cmd(io not align) */
                fd = normal_fd->metadata_fd;
                mode = AIO_MODE_SYNC;
        }

        iov_count = LICH_IOV_MAX;
        if (mbuffer_segcount((buffer_t *)buf) > iov_count) {
                mbuffer_compress((buffer_t *)buf);
        }

        ret = mbuffer_trans(iov, &iov_count, buf);
        YASSERT(ret == (int)buf->len);

#if ISCSI_IO_RECORD
        if (chkid)
                DINFO("aio write "CHKID_FORMAT" fd %d disk %d base_offset:%ld offset %ld size:%d\n",
                                CHKID_ARG(chkid), fd, disk->idx, disk->disk_base_offset, _offset, buf->len);
#endif

        io_prep_pwritev(&iocb, fd, iov, iov_count, _offset + disk->disk_base_offset);

        iocb.aio_reqprio = 0;
        task = schedule_task_get();
        iocb.data = &task;

        ANALYSIS_BEGIN(0);
        
        DBUG("aio yield, task %u offset %ld size:%d\n", task.taskid, _offset, buf->len);

        ret = aio_commit(&iocb, prio, mode);
        if (unlikely(ret)) {
                if (ret == EINVAL)
                        YASSERT(0);
                GOTO(err_ret, ret);
        }

        DBUG("aio resume, task %u offset %ld size:%d\n", task.taskid, _offset, buf->len);

        ANALYSIS_QUEUE(0, IO_WARN, "disk_aio_writev");
        
        return 0;
err_ret:
        return ret;
}

static int disk_normal_connect(const disk_t *disk, disk_t *newdisk)
{
        int ret, fd;
        disk_normal_fd_t *normal_fd, *pub;
        core_t *core = core_self();

        DINFO("open disk[%d] core[%u]\n", disk->idx, core->hash);

        pub = disk->disk_fd;
        ret = ymalloc((void **)&normal_fd, sizeof(disk_normal_fd_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        normal_fd->cache_fd = -1;
        normal_fd->metadata_fd = -1;

        { /* io_fd */
                fd = dup(pub->io_fd);
                if (fd < 0) {
                        ret = errno;
                        GOTO(err_free, ret);
                }

                normal_fd->io_fd = fd;
        }


        { /* io_fd */
                fd = dup(pub->metadata_fd);
                if (fd < 0) {
                        ret = errno;
                        GOTO(err_free, ret);
                }

                normal_fd->metadata_fd = fd;
        }
        
        normal_fd->inited = 1;

        newdisk->disk_fd = normal_fd;
        newdisk->disk_base_offset = disk->disk_base_offset;
        newdisk->idx = disk->idx;
        newdisk->dop = disk->dop;

        //DINFO("open disk %d (type %d) home %s done\n", newdisk->idx, disk->disk_type, home);

        return 0;
err_free:
        yfree((void **)&normal_fd);
err_ret:
        return ret;
}

static void disk_normal_disconnect(disk_t *disk)
{
        disk_normal_fd_t *normal_fd = disk->disk_fd;
        core_t *core = core_self();

        DINFO("close disk[%d] core[%u]\n", disk->idx, core->hash);

        normal_fd->cache_fd = -1;

        close(normal_fd->io_fd);
        close(normal_fd->metadata_fd);
        yfree((void **)&normal_fd);
}

static int __disk_normal_size(const char *home, int idx, uint64_t *disk_size)
{
        int ret;
        struct stat stbuf;
        char target[PATH_MAX];

        //DINFO("open disk %d home %s\n", idx, home);

        sprintf(target, "%s/disk/%d.disk", home, idx);
        ret = disk_get_realpath(target, &stbuf);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (unlikely(gloconf.testing)) {
                *disk_size = stbuf.st_size;
        } else {
                ret = disk_getblocksize(target, disk_size);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static struct disk_op_t __normal_disk_ops__ = {
        .create_new = initnew,
        .probe_check = normal_disk_probe_check,
        .offline = offline,
        .writeable = writeable,
        .destroy = destroy,

        .open = disk_open,
        .close = disk_close,

        .io_pread = disk_io_pread,
        .io_pwrite = disk_io_pwrite,

        //.io_preadv = disk_io_preadv,
        //.io_pwritev = disk_io_pwritev,

        .aio_readv = disk_aio_readv,
        .aio_writev = disk_aio_writev,
        .connect = disk_normal_connect,
        .disconnect = disk_normal_disconnect,
        .get_size = __disk_normal_size,
};

struct disk_op_t *get_normal_disk_ops()
{
        return &__normal_disk_ops__;
}
